
In the previous notebooks we've been using dask.distributed.LocalCluster to split up our tasks and run them on the same compute node that is running the notebook. We noted that this could then use all cores to run tasks in parallel, greatly speeding up loading, and thanks to chunks we can also process some algorithms, like our NDVI seasonal mean, on datasets larger than available RAM.
But what happens if your algorithm and dataset are such they cannot fit the compute nodes' RAM, or the result of the calculation is also massive, or its just so big (memory and computation) that it takes hours to compute?
Well, dask.distributed.LocalCluster is just one member of the dask.distributed cluster family. There are several others but the one we will be using is Kubernetes Cluster (KubeCluster). Kubernetes is an excellent technology that takes advantage of modern Cloud Computing architectures to automatically provision (and remove) compute nodes on demand. It does a lot of other stuff well beyond the scope of this dask tutorial of course. The important point is that using KubeCluster we can dramatically expand the number of Compute Nodes to schedule our dask Workers to; potentially very dramatically.
In this notebook we'll expand our NDVI seasonal mean calculation to a larger area and take it back through two decades of observations. Along the way we'll explore the dask data structures, how computation proceeds, and what we can do to tune the performance. At this spatial size we'll also look at how to interactively visualise a result that is larger than the Jupyter notebook can handle.
Everything we do next builds on the concepts of chunks, tasks, data locality, and task graph covered previously.
Once we have multiple compute nodes to run workers on we have a lot more moving parts in the system. These parts are also fully distributed and will need to communicate between each other to pass results, perform tasks, confirm completion of tasks and ask for more work, etc. It is important to understand what the components are and how they interact because it can impact both performance and stablity of calculation, particularly at very large scales. In addition, data locality really matters when your dataset is large - you don't want to compute() a 1 TB result and have it brought back to the Jupyter kernel on a 32 GiB machine! There are also subtleties to be aware of in how data gets from your Jupyter notebook to the dask distributed nodes - it has to be communicated somehow.
This can all be a bit overwhelming to think about. Thankfully, you don't hit all of these at once as dask does good job of hiding many of the details but then remember our two "laws" of dask from the first notebook:
The transition point from gain to pain, and back to gain, is connected to these details. So let's define out various parts and their roles and start building our knowledge.
In a Kubernetes environment all programs that execute things run in Pods. What a pod is and how it works is a subject for another course and you can use an internet search to find out more. For our purposes it is sufficient to understand that the Jupyter notebook, the dask scheduler, the dask workers, and all the components that make this work are running in Pods.
Pods run on Compute Nodes - physical hardware with an actual CPU, GPU, memory and storage. Compute Nodes can run more than one Pod so long as the sum of all the requests will fit. For example, if your Compute Node has 64 GiB of RAM and your worker pods request 14 GiB each, then 4 workers Pods will run on 1 Compute Node.
Thankfully, you don't need to figure out what Pods get placed where as Kubernetes will do that automatically. There is more to be said about this relationship and the impacts on performance and operational cost but for now just note that Pods are where your code and data lives and they have requests and limits which you can control.
This diagram shows a single user (Joe) running a Jupyter Notebook and connected to a single dask cluster with 5 Workers (running on 3 Worker Nodes).
The Jupyter notebook is where your code is typed in. It has a Python kernel of its own and will be the dask client that talks to the dask cluster. It is running in a Pod, as are all the components. So the Jupyter notebook is separate from the other components in the system and communicates over a network.
The dask cluster is the dask scheduler plus the group of dask workers that process the tasks in a distributed manner. The scheduler and the workers are all Pods, which means they are separate from each other and communicate over a network. This is different to dask.distributed.LocalCluster in which the Jupyter notebook, dask scheduler and workers all resided on the same machine and all communicated very rapidly on the local machine's communications channel. Now they can be on entirely different compute nodes and are communicating over a much slower network. We have the benefit of more compute resources, at the cost of slower communication.
The dask gateway is a new component and is used to manage dask clusters (note that is plural). The Jupyter notebook acts as a client to the dask gateway and makes requests for a dask cluster (both the scheduler and the workers) to the dask gateway to create and destroy them. The dask gateway manages the lifecycle of the cluster on the user's behalf.
Tip: This means the dask clusters have an independent life cycle compared with the Jupyter notebook. Quitting your Jupyter notebook will not necessarily quit your dask cluster.
Moreover, you can have more than one Jupyter notebook talking to the same dask cluster simultaneously. There are some good reasons for doing this but we won't be touching on them in this course.
Let's move our NDVI seasonal mean from the LocalCluster to our dask gateway managed cluster, and add some extra compute resources to it.
The biggest change here is simply how we start and shutdown the dask cluster. The rest of the code, to do the actual computation, is exactly the same.
The first thing we need to do is create a client so we can connect to the dask gateway.
# Initiliaze the Gateway client
from dask.distributed import Client
from dask_gateway import Gateway
gateway = Gateway()
Easy! We now have a gateway client variable. Using this we can start clusters, stop clusters, ask for a list of clusters we have running, set options for our scheduler and workers (like cpu and memory requests).
Let's see what the cluster_options are. We don't need to guess, we can ask the gateway
gateway.cluster_options()
VBox(children=(HTML(value='<h2>Cluster Options</h2>'), GridBox(children=(HTML(value="<p style='font-weight: bo…
Options<worker_cores=8,
worker_threads=8,
worker_memory=28.0,
cuda_worker=False,
node_selection='worker',
service_account='default',
image='444488357543.dkr.ecr.ap-southeast-2.amazonaws.com/easi-dask-noml:master.latest',
image_pull_policy='Always',
db_name='easihub_csiro_db',
db_username='easi_db_user',
db_password='',
aws_access_key_id='',
aws_secret_access_key='vi6yeRmDQVpyzciQ9uW8qjquh1PnA9CRDSWuft7I',
aws_session_token='',
easi_user_allocation='R-14062-01'>
That's a lot I know. The majority of these don't need to change and most users will simply tweak worker parameters: cores, threads (probably keeping it the same as cores), memory and the worker group.
We will be using the defaults for now.
Create the cluster with default options if it doesn't already exist. If a cluster exists in your namespace, the code below will connect to the first cluster. List the available clusters with gateway.list_clusters().
The cluster creation may take a little while (minutes) if a suitable node isn't available for the scheduler. The same thing will occur for workers when they start. If a node does exist then this can happens in seconds.
Tip: Users are often confused by the changing start up time and think something is wrong.
It can take minutes for a brand new node to be provisioned, please be patient. If it takes 10 minutes then yes, something is wrong and you should probably contact an administrator of the system if that problem persists.
clusters = gateway.list_clusters()
if not clusters:
print('Creating new cluster. Please wait for this to finish.')
cluster = gateway.new_cluster()
else:
print(f'An existing cluster was found. Connecting to: {clusters[0].name}')
cluster=gateway.connect(clusters[0].name)
cluster
Creating new cluster. Please wait for this to finish.
VBox(children=(HTML(value='<h2>GatewayCluster</h2>'), HBox(children=(HTML(value='\n<div>\n<style scoped>\n …
Use the GatewayCluster widget (above) to adjust the cluster size. Alternatively use the cluster API methods.
For many tasks 1 or 2 workers will be sufficient, although for larger areas or more complex tasks 5 to 10 workers may be used. If you are new to Dask, start with one worker and then scale your cluster if needed.
In this notebook we'll start with 4 workers - that's 4x the resources for workers compared to our previous LocalCluster. In addition the scheduler is also on its own node, and so is the Jupyter notebook kernel. Lots more resources for all the components involved.
The next cell will use the cluster API to add 4 workers programmatically.
cluster.scale(4)
To connect to your cluster and start doing work, use the get_client() method. This step will wait until the workers are ready. You don't actually have to wait for the workers. The Jupyter notebook can be doing other things whilst the workers are coming up. We're waiting in this example so you don't end up with an unexpected wait later.
*This may take a few minutes before your workers will be ready to use.*
client = cluster.get_client()
client.wait_for_workers(n_workers=4)
client
Client-f09efecd-0421-11ee-8975-6e0d79a27f15
| Connection method: Cluster object | Cluster type: dask_gateway.GatewayCluster |
| Dashboard: https://hub.csiro.easi-eo.solutions/services/dask-gateway/clusters/easihub.1a825e20165046e5a89c2d76fd78ecbf/status |
The client widget provides a clickable dask dashboard link so click that and you'll see your dashboard. It works the same as before despite the fact that everythign is now running in a distributed fashion. If you click the Workers tab in the dashboard you will see that we now have 32 cores (up from 8) made up of 4x 8-core workers. Lot's of RAM too.
Go back to the Status, so you can watch everything run.
We don't need to change any of our code to run this now, so let's repeat the full calculation.
import git
import sys, os
from dateutil.parser import parse
from dateutil.relativedelta import relativedelta
from dask.distributed import Client, LocalCluster
import datacube
from datacube.utils import masking
from datacube.utils.aws import configure_s3_access
# EASI defaults
os.environ['USE_PYGEOS'] = '0'
repo = git.Repo('.', search_parent_directories=True).working_tree_dir
if repo not in sys.path: sys.path.append(repo)
from easi_tools import EasiDefaults, notebook_utils
easi = EasiDefaults()
Successfully found configuration for deployment "csiro"
dc = datacube.Datacube()
configure_s3_access(aws_unsigned=False, requester_pays=True, client=client);
# Get the centroid of the coordinates of the default extents
central_lat = sum(easi.latitude)/2
central_lon = sum(easi.longitude)/2
# central_lat = -42.019
# central_lon = 146.615
# Set the buffer to load around the central coordinates
# This is a radial distance for the bbox to actual area so bbox 2x buffer in both dimensions
buffer = 0.8
# Compute the bounding box for the study area
study_area_lat = (central_lat - buffer, central_lat + buffer)
study_area_lon = (central_lon - buffer, central_lon + buffer)
# Data product
products = easi.product('landsat')
# Set the date range to load data over
set_time = easi.time
set_time = (set_time[0], parse(set_time[0]) + relativedelta(years=1))
# set_time = ("2021-01-01", "2021-12-31")
# Selected measurement names (used in this notebook)
nir = easi.nir('landsat') # If defined, else None
red = easi.red('landsat') # If defined, else None
if nir is None: nir = 'nir08' # USGS Landsat
if red is None: red = 'red' # USGS Landsat
# Set the QA band name and mask values
good_pixel_flags = { # USGS Landsat
'nodata': False,
'cloud': 'not_high_confidence',
'cloud_shadow': 'not_high_confidence',
'water': 'land_or_cloud'
}
qa_band = easi.qa_band('landsat') # If defined, else None
qa_mask = easi.qa_mask('landsat') # If defined, else None
if qa_band is None: qa_band = 'qa_pixel' # USGS Landsat
if qa_mask is None: qa_mask = good_pixel_flags
# Set the measurements/bands to load. `None` will load all of them
measurements = None
# Set the resampling method for the bands
resampling = {qa_band: "nearest", "*": "average"}
# Set the coordinate reference system and output resolution
set_crs = easi.crs('landsat') # If defined, else None
set_resolution = easi.resolution('landsat') # If defined, else None
# set_crs = "epsg:3577"
# set_resolution = (-30, 30)
# Set the scene group_by method
group_by = "solar_day"
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":1, "x":2048, "y":2048}, ## No change here, chunking spatially just like before
group_by=group_by,
)
print(f"dataset size (GiB) {dataset.nbytes / 2**30:.2f}")
dataset size (GiB) 88.87
# Identify pixels that are either "valid", "water" or "snow"
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[nir] - cloud_free[red]
band_sum = cloud_free[nir] + cloud_free[red]
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
ndvi_unweighted = ndvi.groupby("time.season").mean("time") # Calculate the seasonal mean
%%time
actual_result = ndvi_unweighted.compute()
CPU times: user 1.57 s, sys: 728 ms, total: 2.3 s Wall time: 57.8 s
As before there will be a short delay as the Jupyter kernel (client) is used to optimize the task graph before sending it to the scheduler which will then execute tasks on the workers.
Tip: You can open a terminal and use
htopto monitor the Jupyter notebook CPU usage. You'll see at least one core using nearly 100% cpu usage during the optimisation phase. It will then drop back to idle as the task graph is sent to the scheduler at which point the dashboard will show activity on the cluster.
actual_result.sel(season='DJF').plot()
<matplotlib.collections.QuadMesh at 0x7f372947e3b0>
Before we "Go Big" let's take advantage of our new resources.
We've gained memory. The more memory we have the more data we can operate on at once and the less we need to communicate between nodes. Communication over a network is slow relative to local communcation in a pod. So if we change the chunking we may see an improvement in performance.
Chunking will also impact the number of tasks - the fewer chunks, the fewer tasks. This in turn will impact how much task graph optimization is required, how hard the scheduler has to work, and how much communication of partial results goes between workers (for example, passing the partial means around to get a final mean).
Let's look at our existing chunk size - (1,2048,2048)
dataset[red]
<xarray.DataArray 'nbart_red' (time: 43, y: 6441, x: 5469)>
dask.array<dc_load_nbart_red, shape=(43, 6441, 5469), dtype=int16, chunksize=(1, 2048, 2048), chunktype=numpy.ndarray>
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:31.774346 ... 2021-01...
* y (y) float64 -3.922e+06 -3.922e+06 ... -4.115e+06 -4.115e+06
* x (x) float64 1.272e+06 1.272e+06 ... 1.436e+06 1.436e+06
spatial_ref int32 3577
Attributes:
units: 1
nodata: -999
crs: EPSG:3577
grid_mapping: spatial_ref8 MiB in use for red chunks. Of course this will vary with data type and stage of computation so when tuning dask you may need to check on the chunk size and tasks as your computation transforms the data. We're doing a simple computation here so we can focus on this initial value. With experience it does get easier to figure out when and where this parameter needs further adjustment. We'll look at some of this later.
For now there are some things we can observe:
So we have good reason to increase our chunks both spatially and temporally; just be mindful of the impact on communication of results between nodes. The mean is seasonal and Landsat 8 performs repeat passes nominally every 16 days, so let's do a small grouping in time. We'll also increase the spatial chunking slightly.
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":3072}, # This line has changed
group_by=group_by,
)
dataset[red]
<xarray.DataArray 'nbart_red' (time: 43, y: 6441, x: 5469)>
dask.array<rechunk-merge, shape=(43, 6441, 5469), dtype=int16, chunksize=(2, 3072, 3072), chunktype=numpy.ndarray>
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:31.774346 ... 2021-01...
* y (y) float64 -3.922e+06 -3.922e+06 ... -4.115e+06 -4.115e+06
* x (x) float64 1.272e+06 1.272e+06 ... 1.436e+06 1.436e+06
spatial_ref int32 3577
Attributes:
units: 1
nodata: -999
crs: EPSG:3577
grid_mapping: spatial_refAs you can see the number of tasks has dropped and our chunks are larger at 36 MiB.
There is a small slither along the bottom because the chunk size isn't a good fit for the actual array size. Let's expand our chunk size in that direction slightly to give us a better fit.
slither = dataset.dims['y'] - (2 * 3072)
print(f'Y-dim slither: {slither}')
y_chunk = 3072 + math.ceil(slither/2.)
print(f'Y-dim chunk: {y_chunk}')
Y-dim slither: 297 Y-dim chunk: 3221
import math
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":y_chunk},
group_by=group_by,
)
dataset[red]
<xarray.DataArray 'nbart_red' (time: 43, y: 6441, x: 5469)>
dask.array<rechunk-merge, shape=(43, 6441, 5469), dtype=int16, chunksize=(2, 3221, 3072), chunktype=numpy.ndarray>
Coordinates:
* time (time) datetime64[ns] 2020-02-08T23:56:31.774346 ... 2021-01...
* y (y) float64 -3.922e+06 -3.922e+06 ... -4.115e+06 -4.115e+06
* x (x) float64 1.272e+06 1.272e+06 ... 1.436e+06 1.436e+06
spatial_ref int32 3577
Attributes:
units: 1
nodata: -999
crs: EPSG:3577
grid_mapping: spatial_refNotice how that small change to remove the sliver only marginally increased our chunk memory usage but dramatically reduced the number of chunks.
Let's see what this does to our performance. We need to re-run the code to update all the intermediate variables in our calculation and call compute()
# Identify pixels that are either "valid", "water" or "snow"
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[nir] - cloud_free[red]
band_sum = cloud_free[nir] + cloud_free[red]
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
ndvi_unweighted = ndvi.groupby("time.season").mean("time") # Calculate the seasonal mean
%%time
actual_result = ndvi_unweighted.compute()
CPU times: user 1.35 s, sys: 688 ms, total: 2.04 s Wall time: 1min 6s
You will notice the time for task graph optimization - the delay between executing the cell above and seeing processing in the cluster dashboard - is down significantly. Fewer tasks means less time in optimization. We've decreased the computation time as well.
There is one more thing we can do before we "Go Big". We've done this before and its simple enough. Save dask the challenge of figuring out which measurements we aren't using by telling it only to load the ones we do use. Let's add our measurements list in.
measurements = [qa_band, red, nir]
dataset = None # clear results from any previous runs
dataset = dc.load(
product=products,
x=study_area_lon,
y=study_area_lat,
time=set_time,
measurements=measurements,
resampling=resampling,
output_crs=set_crs,
resolution=set_resolution,
dask_chunks = {"time":2, "x":3072, "y":y_chunk},
group_by=group_by,
)
# Identify pixels that are either "valid", "water" or "snow"
cloud_free_mask = masking.make_mask(dataset[qa_band], **qa_mask)
# Apply the mask
cloud_free = dataset.where(cloud_free_mask)
# Calculate the components that make up the NDVI calculation
band_diff = cloud_free[nir] - cloud_free[red]
band_sum = cloud_free[nir] + cloud_free[red]
# Calculate NDVI and store it as a measurement in the original dataset ta da
ndvi = None
ndvi = band_diff / band_sum
ndvi_unweighted = ndvi.groupby("time.season").mean("time") # Calculate the seasonal mean
%%time
actual_result = ndvi_unweighted.compute()
CPU times: user 1.59 s, sys: 732 ms, total: 2.32 s Wall time: 1min 1s
This didn't make to much difference to computational time but it has shortened the task graph optimisation phase a little more. That time isn't prohibitive in this example but as we "Go Big" it will be.
Disconnecting your client is good practice, but the cluster will still be up so we need to shut it down as well
client.close()
cluster.shutdown()